Security News
RubyGems.org Adds New Maintainer Role
RubyGems.org has added a new "maintainer" role that allows for publishing new versions of gems. This new permission type is aimed at improving security for gem owners and the service overall.
The p-queue package is a promise-based queue for Node.js that enables the execution of tasks in a controlled concurrency environment. It allows for rate-limiting, pausing, and resuming of tasks, and ensures that tasks are executed in a predictable manner.
Concurrency Control
This feature allows you to control the number of tasks that are run concurrently. In this example, the concurrency is set to 2, meaning that only two tasks will be processed at the same time.
const PQueue = require('p-queue').default;
const queue = new PQueue({concurrency: 2});
async function task(input) {
// Task implementation
}
queue.add(() => task('task 1'));
queue.add(() => task('task 2'));
queue.add(() => task('task 3'));
Rate Limiting
This feature allows you to limit the rate at which tasks are executed. In this example, the queue is configured to process a maximum of 2 tasks per 1000 milliseconds.
const PQueue = require('p-queue').default;
const queue = new PQueue({
interval: 1000,
intervalCap: 2
});
async function task(input) {
// Task implementation
}
for (let i = 0; i < 6; i++) {
queue.add(() => task(`task ${i + 1}`));
}
Pausing and Resuming
This feature allows you to pause the processing of tasks and resume it later. In this example, the queue is paused immediately after creation, tasks are added, and then the queue is resumed after a timeout.
const PQueue = require('p-queue').default;
const queue = new PQueue();
queue.pause();
async function task(input) {
// Task implementation
}
queue.add(() => task('task 1'));
queue.add(() => task('task 2'));
setTimeout(() => {
queue.start(); // Resume the queue after 2000ms
}, 2000);
Priority Queueing
This feature allows you to add tasks with a priority level. Tasks with a lower priority number are executed first. In this example, the 'high priority' task will be executed before the 'low priority' task.
const PQueue = require('p-queue').default;
const queue = new PQueue();
async function task(input) {
// Task implementation
}
queue.add(() => task('low priority'), {priority: 1});
queue.add(() => task('high priority'), {priority: 0});
The 'async' package provides a variety of functions for working with asynchronous JavaScript, including queue management. It is similar to p-queue but offers a broader set of utilities for asynchronous control flow.
Bottleneck is a lightweight and powerful rate limiting library for Node.js. It is similar to p-queue in its ability to rate limit tasks but also provides features like clustering support for distributed rate limiting.
Bull is a Redis-backed queue package for handling distributed jobs and messages in Node.js. It offers functionality similar to p-queue with additional features like repeatable jobs, delayed jobs, and job event listeners.
Promise queue with concurrency control
Useful for rate-limiting async (or sync) operations. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.
$ npm install p-queue
Here we run only one promise at the time. For example, set concurrency
to 4 to run four promises at the same time.
const {default: PQueue} = require('p-queue');
const got = require('got');
const queue = new PQueue({concurrency: 1});
(async () => {
await queue.add(() => got('https://sindresorhus.com'));
console.log('Done: sindresorhus.com');
})();
(async () => {
await queue.add(() => got('https://avajs.dev'));
console.log('Done: avajs.dev');
})();
(async () => {
const task = await getUnicornTask();
await queue.add(task);
console.log('Done: Unicorn task');
})();
Returns a new queue
instance, which is an EventEmitter3
subclass.
Type: object
Type: number
Default: Infinity
Minimum: 1
Concurrency limit.
Type: number
Per-operation timeout in milliseconds. Operations fulfill once timeout
elapses if they haven't already.
Type: boolean
Default: false
Whether or not a timeout is considered an exception.
Type: boolean
Default: true
Whether queue tasks within concurrency limit, are auto-executed as soon as they're added.
Type: Function
Class with a enqueue
and dequeue
method, and a size
getter. See the Custom QueueClass section.
Type: number
Default: Infinity
Minimum: 1
The max number of runs in the given interval of time.
Type: number
Default: 0
Minimum: 0
The length of time in milliseconds before the interval count resets. Must be finite.
Type: boolean
Default: false
If true
, specifies that any pending Promises, should be carried over into the next interval and counted against the intervalCap
. If false
, any of those pending Promises will not count towards the next intervalCap
.
PQueue
instance.
Adds a sync or async task to the queue. Always returns a promise.
Type: Function
Promise-returning/async function.
Type: object
Type: number
Default: 0
Priority of operation. Operations with greater priority will be scheduled first.
Same as .add()
, but accepts an array of sync or async functions and returns a promise that resolves when all functions are resolved.
Put queue execution on hold.
Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via options.autoStart = false
or by .pause()
method.)
Returns this
(the instance).
Returns a promise that settles when the queue becomes empty.
Can be called multiple times. Useful if you for example add additional items at a later time.
Returns a promise that settles when the queue becomes empty, and all promises have completed; queue.size === 0 && queue.pending === 0
.
The difference with .onEmpty
is that .onIdle
guarantees that all work from the queue has finished. .onEmpty
merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
Clear the queue.
Size of the queue.
Size of the queue, filtered by the given options.
For example, this can be used to find the number of items remaining in the queue with a specific priority level.
const queue = new PQueue();
queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦄', {priority: 0});
queue.add(async () => '🦄', {priority: 1});
console.log(queue.sizeBy({priority: 1}));
//=> 2
console.log(queue.sizeBy({priority: 0}));
//=> 1
Number of pending promises.
Whether the queue is currently paused.
Emitted as each item is processed in the queue for the purpose of tracking progress.
const delay = require('delay');
const {default: PQueue} = require('p-queue');
const queue = new PQueue({concurrency: 2});
let count = 0;
queue.on('active', () => {
console.log(`Working on item #${++count}. Size: ${queue.size} Pending: ${queue.pending}`);
});
queue.add(() => Promise.resolve());
queue.add(() => delay(2000));
queue.add(() => Promise.resolve());
queue.add(() => Promise.resolve());
queue.add(() => delay(500));
Emitted every time the queue becomes empty and all promises have completed; queue.size === 0 && queue.pending === 0
.
const delay = require('delay');
const {default: PQueue} = require('p-queue');
const queue = new PQueue();
queue.on('idle', () => {
console.log(`Queue is idle. Size: ${queue.size} Pending: ${queue.pending}`);
});
const job1 = queue.add(() => delay(2000));
const job2 = queue.add(() => delay(500));
await job1;
await job2;
// => 'Queue is idle. Size: 0 Pending: 0'
await queue.add(() => delay(600));
// => 'Queue is idle. Size: 0 Pending: 0'
The idle
event is emitted every time the queue reaches an idle state. On the other hand, the promise the onIdle()
function returns resolves once the queue becomes idle instead of every time the queue is idle.
Emitted every time the add method is called and the number of pending or queued tasks is increased.
Emitted every time a task is completed and the number of pending or queued tasks is decreased.
const delay = require('delay');
const {default: PQueue} = require('p-queue');
const queue = new PQueue();
queue.on('add', () => {
console.log(`Task is added. Size: ${queue.size} Pending: ${queue.pending}`);
});
queue.on('next', () => {
console.log(`Task is completed. Size: ${queue.size} Pending: ${queue.pending}`);
});
const job1 = queue.add(() => delay(2000));
const job2 = queue.add(() => delay(500));
await job1;
await job2;
//=> 'Task is added. Size: 0 Pending: 1'
//=> 'Task is added. Size: 0 Pending: 2'
await queue.add(() => delay(600));
//=> 'Task is completed. Size: 0 Pending: 1'
//=> 'Task is completed. Size: 0 Pending: 0'
A more advanced example to help you understand the flow.
const delay = require('delay');
const {default: PQueue} = require('p-queue');
const queue = new PQueue({concurrency: 1});
(async () => {
await delay(200);
console.log(`8. Pending promises: ${queue.pending}`);
//=> '8. Pending promises: 0'
(async () => {
await queue.add(async () => '🐙');
console.log('11. Resolved')
})();
console.log('9. Added 🐙');
console.log(`10. Pending promises: ${queue.pending}`);
//=> '10. Pending promises: 1'
await queue.onIdle();
console.log('12. All work is done');
})();
(async () => {
await queue.add(async () => '🦄');
console.log('5. Resolved')
})();
console.log('1. Added 🦄');
(async () => {
await queue.add(async () => '🐴');
console.log('6. Resolved')
})();
console.log('2. Added 🐴');
(async () => {
await queue.onEmpty();
console.log('7. Queue is empty');
})();
console.log(`3. Queue size: ${queue.size}`);
//=> '3. Queue size: 1`
console.log(`4. Pending promises: ${queue.pending}`);
//=> '4. Pending promises: 1'
$ node example.js
1. Added 🦄
2. Added 🐴
3. Queue size: 1
4. Pending promises: 1
5. Resolved 🦄
6. Resolved 🐴
7. Queue is empty
8. Pending promises: 0
9. Added 🐙
10. Pending promises: 1
11. Resolved 🐙
12. All work is done
For implementing more complex scheduling policies, you can provide a QueueClass in the options:
class QueueClass {
constructor() {
this._queue = [];
}
enqueue(run, options) {
this._queue.push(run);
}
dequeue() {
return this._queue.shift();
}
get size() {
return this._queue.length;
}
filter(options) {
return this._queue;
}
}
p-queue
will call corresponding methods to put and get operations from this queue.
FAQs
Promise queue with concurrency control
The npm package p-queue receives a total of 2,722,471 weekly downloads. As such, p-queue popularity was classified as popular.
We found that p-queue demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
RubyGems.org has added a new "maintainer" role that allows for publishing new versions of gems. This new permission type is aimed at improving security for gem owners and the service overall.
Security News
Node.js will be enforcing stricter semver-major PR policies a month before major releases to enhance stability and ensure reliable release candidates.
Security News
Research
Socket's threat research team has detected five malicious npm packages targeting Roblox developers, deploying malware to steal credentials and personal data.